---
title: "Part 9: Connection Pool"
---

## Problem

In [Part 4: Routing](/tutorial/04-routing) tutorial we showed that *muxHTTP* has a connection management feature, where a connection is stored as a key / value pair and the value represents the connection. The connection here actually refers to the target pipeline, and the *connect* filter used in the target pipeline keeps the connection upstream. In previous tutorials, we used the target server address as the key, that made all all the requests going to same target host via same upstream connection. This means that there is only one connection per upstream. Pipy's *muxHTTP* provides HTTP1 multiplexing support, but sequential sending of requests and responses to and from the same downstream connection inevitably leads to performance issues.

We can do a simple test. Pipy has built-in support for features like in-memory dump. It can be activated by sending a *SIGTSTP* signal to the Pipy process for it to print out readable information such as the number of classes and instances loaded, the number of pipes, and so on.

We will use *wrk* tool for our test and let's try sending 5 concurrent connection via 
`wrk -t 1 -c 5 -d 2s --latency http://localhost:8000/ip`. After *wrk* returns, we will send *SIGTSTP* signal to pipy instance by pressing *ctrl+z* key combination.

In the pipeline column, we see that the total and active instances of the *connection* pipeline are both 1, indicating that only one connection is used by Pipy.

```
PIPELINE                             #ALLOCATED  #ACTIVE
----------------------------------------------------------
  /plugins/balancer.js [connection]  1           1
```

In this tutorial we are going to implement resource pooling to manage connections. So let's jump into the fun part.

## ResourcePool

Pipy `algo.ResourcePool` provides resource pooling functionality, where it takes a function (a.k.a resource generator) as a constructor parameter. When there are no free resources in the resource pool, this passed function/generator is called to allocate new resources.

`ResourcePool` has two public methods:

* `allocate(pool)`: Returns the resource from the param `pool` specified resource pool
* `free(resource)`: Frees the resource by returning it to pool. `resource` param specifies which resource to free.

``` js
  pipy({
    _services: (
      Object.fromEntries(
        Object.entries(config.services).map(
          ([k, v]) => [
            k, new algo.RoundRobinLoadBalancer(v)
          ]
        )
      )
    ),

    _balancer: null,
    _balancerCache: null,
    _target: '',

+   _g: {
+     connectionID: 0,
+   },

+   _connectionPool: new algo.ResourcePool(
+     () => ++_g.connectionID
+   ),
  })
```

We introduce two new variables `_g` and `_connectionPool`. Keen reader may ask why variables used for increments are placed in `_g`?

The argument to `pipy()` is the "root context" of the entire Pipy instance, and when some pipeline connectors connect to sub-pipelines, they copy a new context from the root context and operate on global variables in the new context, such as the `use` and `link` connectors. The `merge` connector, on the other hand, manipulates global variables in their original context, as described later.

If we had declared `connectionID` as the global variable with default value of *0*. As stated above `use` and similar joint-filters copy the context and work on their copy, so changes made in those join-filters works on the local copy and won't be visible to global scope. By wrapping that variable inside an object, we are actually passing the reference.

> This is what is known as passing by value and passing by reference.

Going back to the initial test, once you have the resource pool, there will be five resources in the pool (there is only one upstream target server, so there is only one pool) for the five downstream connections: 1, 2, 3, 4, 5. A downstream connection may get any of these when it fetches resource from the pool. But we want to use the same fixed upstream connection, as we did in [Previous article](/tutorial/08-load-balancing-Improved): Requests from the same downstream connection can be routed to the same upstream destination server.

We can use `Caching` to solve our problem, so let's implement the caching functionality:

## Cache

``` js
  pipy({
    _services: (
      Object.fromEntries(
        Object.entries(config.services).map(
          ([k, v]) => [
            k, new algo.RoundRobinLoadBalancer(v)
          ]
        )
      )
    ),

    _balancer: null,
    _balancerCache: null,
    _target: '',
+   _targetCache: null,

    _g: {
      connectionID: 0,
    },

    _connectionPool: new algo.ResourcePool(
      () => ++_g.connectionID
    ),
  })

  .import({
    __turnDown: 'proxy',
    __serviceID: 'router',
  })

  .pipeline('session')
    .handleStreamStart(
      () => (
+       _targetCache = new algo.Cache(
+         // k is a target, v is a connection ID
+         (k  ) => _connectionPool.allocate(k),
+         (k,v) => _connectionPool.free(v),
+       ),
        _balancerCache = new algo.Cache(
          // k is a balancer, v is a target
          (k  ) => k.select(),
          (k,v) => k.deselect(v),
        )
      )
    )
    .handleStreamEnd(
      () => (
+       _targetCache.clear(),
        _balancerCache.clear()
      )
    )
```

We have introduced new cache variable `_targetCache` and initialize that with `algo.Cache` constructor by passing it `allocator` and `deallocator` logic. `allocator` logic will be invoked when we request a resource and when downstream connection is disconnected (*StreamEnd*) event, out `deallocator` logic will be invoked to put it back to the resource pool.

We also need to update *muxHTTP* filter as below:

``` js
  .pipeline('forward')
    .muxHTTP(
      'connection',
-     () => _target
+     () => _targetCache.get(_target)
    )

  .pipeline('connection')
    .connect(
      () => _target
    )
```

## Test in action

The total allocated and active instances of the *Connection* pipeline are both 5, which corresponds to the total number of downstream connections.

```
PIPELINE                                  #ALLOCATED  #ACTIVE
---------------------------------------------------------------
  /plugins/balancer.js [connection]       5           5
```

## Summary

In this tutorial we learned how to use the `ResourcePool` in conjunction with the *muxHTTP* filter to implement connection pooling. The number of new lines of code added is small, but the length of this article is to deepen the understanding of global variables and *muxHTTP*.

### Takeaways

1. Use `ResourcePool` to implement resource pooling functionality.
2. Variables that need to be shared across downstreams need to be passed as an object (or pass by reference).

### What's next?

Standard routing also requires flexible path handling. For example, `/ip` in the routing table does not exist in the upstream service. Let's implement redirection to solve this problem.
